01 Moving Average (MA) Model#
Goals#
Define the MA(\(q\)) model (and how it differs from a rolling mean).
Explain the generative “algorithm” and the math behind it.
Understand assumptions, requirements, and invertibility.
Implement MA simulation/inversion in NumPy.
Visualize the effect of order, impulse response, and residual autocorrelation.
Prerequisites#
Basic probability (expectation, variance, covariance)
Time series basics (lags, autocorrelation)
Python: NumPy + Plotly
Model definition (MA(\(q\)))#
An MA(\(q\)) model is a finite linear filter of white-noise shocks. It is not the same thing as the rolling/simple moving average used for smoothing.
Time-domain form#
[ X_t = \mu + \varepsilon_t + \theta_1 \varepsilon_{t-1} + \cdots + \theta_q \varepsilon_{t-q}, \qquad \varepsilon_t \sim \text{WN}(0,\sigma^2). ]
Convenient shorthand: set (\theta_0 = 1) so [ X_t - \mu = \sum_{i=0}^q \theta_i \varepsilon_{t-i}. ]
Backshift-operator form#
Let (B X_t = X_{t-1}). Then [ X_t - \mu = \Theta(B),\varepsilon_t, \qquad \Theta(B) = 1 + \theta_1 B + \cdots + \theta_q B^q. ]
What the “algorithm” is#
1) Generating (simulating) an MA(\(q\))#
For each time step (t):
Draw a new shock (\varepsilon_t).
Output (X_t) as a weighted sum of the current and last (q) shocks.
In other words, MA(\(q\)) is an FIR (finite impulse response) filter applied to white noise.
2) Using MA(\(q\)) as a statistical model (Box–Jenkins view)#
Identify (q): the ACF often shows a sharp cutoff after lag (q) (heuristic).
Estimate parameters: typically via (Gaussian) maximum likelihood / innovations algorithms.
Diagnose fit: residuals (estimated shocks) should look like white noise; residual ACF should be near 0.
Forecast: beyond (q) steps, the optimal forecast tends back to (\mu) because future shocks have mean 0.
How it works mathematically (step-by-step)#
Write the centered process (Y_t = X_t - \mu = \sum_{i=0}^q \theta_i \varepsilon_{t-i}).
Mean#
[ \mathbb{E}[X_t] = \mu + \sum_{i=0}^q \theta_i,\mathbb{E}[\varepsilon_{t-i}] = \mu. ]
Autocovariance#
For lag (k \ge 0): [ \begin{aligned} \gamma(k) &= \text{Cov}(Y_t, Y_{t-k})\ &= \text{Cov}\Big(\sum_{i=0}^q \theta_i \varepsilon_{t-i},\ \sum_{j=0}^q \theta_j \varepsilon_{t-k-j}\Big)\ &= \sum_{i=0}^q \sum_{j=0}^q \theta_i\theta_j,\text{Cov}(\varepsilon_{t-i}, \varepsilon_{t-k-j}). \end{aligned} ]
Because ({\varepsilon_t}) is white noise, (\text{Cov}(\varepsilon_{t-i}, \varepsilon_{t-k-j}) = \sigma^2) only when (t-i = t-k-j) (i.e. (j=i-k)), otherwise it is 0. That leaves only overlapping terms: [ \gamma(k) = \sigma^2 \sum_{i=0}^{q-k} \theta_i\theta_{i+k}, \qquad k=0,1,\dots,q. ] For (|k|>q), there is no overlap in shocks, so (\gamma(k)=0).
Autocorrelation (ACF)#
[ \rho(k) = \frac{\gamma(k)}{\gamma(0)}. ] Key takeaway: the theoretical ACF of MA(\(q\)) is exactly 0 for lags (|k|>q) (“cutoff”).
Impulse response#
If a single unit shock happens at time (t) ((\varepsilon_t=1), all other shocks 0), then [ X_{t+h} - \mu = \theta_h\ \text{for } h=0,1,\dots,q,\quad \text{and } 0 \text{ thereafter}, ] where (\theta_0=1). So MA models have finite shock memory.
How and when it is used (and what it models)#
MA(\(q\)) is used when dependence comes from short-lived disturbances rather than long-run persistence.
Typical scenarios / incident and noise patterns:
Measurement noise with short carryover (sensor “ringing”, smoothing/filtering artifacts).
One-off incidents that affect a few subsequent observations (temporary outage recovery, delayed reporting).
Aggregation / batching effects (value at time (t) mixes several recent random contributions).
Microstructure noise in finance (e.g., bid–ask bounce creates short-range negative autocorrelation).
Practical modeling:
Standalone MA models are common for short-memory series.
More often, MA terms appear inside ARMA/ARIMA/SARIMA models to capture short-run shock structure.
Model assumptions and requirements#
Core assumptions (for the MA part):
Innovations (\varepsilon_t) have zero mean, constant variance (\sigma^2), and are uncorrelated over time (often assumed i.i.d.).
Parameters (\mu,\theta_1,\dots,\theta_q) are constant over the sample.
The relationship is linear and time-invariant.
Notes:
Stationarity: any finite MA(\(q\)) with finite-variance shocks is (weakly) stationary; no restrictions on (\theta) are needed for stationarity.
For likelihood-based estimation and standard errors, people often assume Gaussian shocks; without Gaussianity, Gaussian MLE is still a common quasi-MLE.
Estimation typically needs a sample size meaningfully larger than (q), and diagnostics rely on having enough data to see residual autocorrelation.
Invertibility#
MA parameters are not always uniquely identified from second-order moments unless we impose invertibility.
Define the MA polynomial [ \Theta(z) = 1 + \theta_1 z + \cdots + \theta_q z^q. ] The MA(\(q\)) is invertible if all roots of (\Theta(z)) lie outside the unit circle: [ |z_i| > 1 \quad \text{for every root } z_i. ]
Why it matters:
It guarantees a stable inverse filter, so shocks can be recovered from data: (\varepsilon_t = \Theta(B)^{-1}(X_t-\mu)), an AR(\(\infty\)) expansion with absolutely summable coefficients.
It makes the MA representation unique (no parameter aliases).
MA(1) intuition#
For MA(1): (X_t-\mu = (1+\theta B)\varepsilon_t). If (|\theta|<1), we can expand the inverse as a convergent series: [ (1+\theta B)^{-1} = 1 - \theta B + \theta^2 B^2 - \theta^3 B^3 + \cdots. ] If (|\theta|\ge 1), this inverse does not converge, and computing shocks from observations becomes unstable.
Intuition behind the error terms (innovations)#
The shocks (\varepsilon_t) represent new information that was not predictable from the past. An MA model says: the observation today is the current shock plus echoes of a few recent shocks.
Abstract examples:
Echo / ringing: a calibration event perturbs the next few readings.
Delayed reporting: one random reporting error spills into a couple of subsequent periods.
Batching: what you record at (t) includes several recent random contributions.
import numpy as np
import plotly.graph_objects as go
import os
import plotly.io as pio
from plotly.subplots import make_subplots
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
np.set_printoptions(precision=4, suppress=True)
rng = np.random.default_rng(42)
def ma_filter(eps, theta, mu=0.0):
eps = np.asarray(eps, dtype=float)
theta = np.asarray(theta, dtype=float)
q = theta.size
n = eps.size
x = np.empty(n, dtype=float)
for t in range(n):
value = eps[t]
for k in range(1, q + 1):
if t - k >= 0:
value += theta[k - 1] * eps[t - k]
x[t] = mu + value
return x
def simulate_ma(
theta,
n,
mu=0.0,
sigma=1.0,
rng=None,
burnin=200,
):
if rng is None:
rng = np.random.default_rng()
eps = rng.normal(0.0, sigma, size=n + burnin)
x = ma_filter(eps, theta, mu=mu)
return x[burnin:], eps[burnin:]
def innovations_from_ma(x, theta, mu=0.0):
x = np.asarray(x, dtype=float)
theta = np.asarray(theta, dtype=float)
q = theta.size
n = x.size
eps = np.zeros(n, dtype=float)
for t in range(n):
value = x[t] - mu
for k in range(1, q + 1):
if t - k >= 0:
value -= theta[k - 1] * eps[t - k]
eps[t] = value
return eps
def sample_acf(x, max_lag):
x = np.asarray(x, dtype=float)
x = x - x.mean()
denom = float(np.dot(x, x))
if denom == 0.0:
return np.r_[1.0, np.zeros(max_lag)]
acf = np.empty(max_lag + 1, dtype=float)
acf[0] = 1.0
for k in range(1, max_lag + 1):
acf[k] = float(np.dot(x[k:], x[:-k]) / denom)
return acf
def theoretical_acf_ma(theta, max_lag):
theta = np.asarray(theta, dtype=float)
q = theta.size
theta_full = np.r_[1.0, theta]
gamma = np.zeros(max_lag + 1, dtype=float)
for k in range(0, max_lag + 1):
if k <= q:
gamma[k] = float(np.sum(theta_full[: q + 1 - k] * theta_full[k:]))
return gamma / gamma[0]
def is_invertible(theta, tol=1e-12):
theta = np.asarray(theta, dtype=float)
if theta.size == 0:
return True
coeffs = np.r_[theta[::-1], 1.0]
roots = np.roots(coeffs)
return bool(np.all(np.abs(roots) > 1 + tol))
def fit_ma1_via_acf(x):
x = np.asarray(x, dtype=float)
mu_hat = float(x.mean())
x0 = x - mu_hat
r1 = float(sample_acf(x0, 1)[1])
r1 = float(np.clip(r1, -0.499, 0.499))
if abs(r1) < 1e-12:
theta_hat = 0.0
else:
disc = max(0.0, 1.0 - 4.0 * r1 * r1)
s = disc ** 0.5
theta_a = (1.0 + s) / (2.0 * r1)
theta_b = (1.0 - s) / (2.0 * r1)
theta_hat = min([theta_a, theta_b], key=lambda v: abs(v))
theta_hat = float(np.clip(theta_hat, -0.99, 0.99))
var_hat = float(np.mean(x0 * x0))
sigma2_hat = var_hat / (1.0 + theta_hat * theta_hat)
sigma_hat = float(sigma2_hat ** 0.5)
return mu_hat, np.array([theta_hat]), sigma_hat
def fit_ma_via_acf_random_search(
x,
q,
max_lag=20,
n_candidates=20000,
seed=0,
):
x = np.asarray(x, dtype=float)
mu_hat = float(x.mean())
x0 = x - mu_hat
target = sample_acf(x0, max_lag)
rng_local = np.random.default_rng(seed)
best_theta = None
best_loss = np.inf
for _ in range(n_candidates):
cand = rng_local.uniform(-0.99, 0.99, size=q)
if not is_invertible(cand):
continue
theo = theoretical_acf_ma(cand, max_lag)
loss = float(np.mean((target[1:] - theo[1:]) ** 2))
if loss < best_loss:
best_loss = loss
best_theta = cand
if best_theta is None:
best_theta = np.zeros(q, dtype=float)
var_hat = float(np.mean(x0 * x0))
sigma2_hat = var_hat / float(np.sum(np.r_[1.0, best_theta] ** 2))
sigma_hat = float(sigma2_hat ** 0.5)
return mu_hat, best_theta, sigma_hat
orders = {
"MA(0)": np.array([]),
"MA(1)": np.array([0.8]),
"MA(2)": np.array([0.8, 0.3]),
"MA(5)": np.array([0.8, 0.3, 0.1, 0.05, 0.02]),
}
burnin = 50
n = 250
max_lag = 20
eps = rng.normal(0.0, 1.0, size=n + burnin)
fig = make_subplots(
rows=2,
cols=len(orders),
column_titles=[
f"{name} (invertible={is_invertible(theta)})" for name, theta in orders.items()
],
row_titles=["sample path", "sample ACF"],
vertical_spacing=0.15,
)
for col, (name, theta) in enumerate(orders.items(), start=1):
x = ma_filter(eps, theta)[burnin:]
fig.add_trace(
go.Scatter(y=x, mode="lines", name=name, showlegend=False),
row=1,
col=col,
)
acf = sample_acf(x, max_lag)
fig.add_trace(
go.Bar(x=np.arange(max_lag + 1), y=acf, showlegend=False),
row=2,
col=col,
)
fig.update_xaxes(title_text="t", row=1, col=col)
fig.update_xaxes(title_text="lag", row=2, col=col)
fig.update_yaxes(title_text="X_t", row=1, col=col)
fig.update_yaxes(title_text="ACF", row=2, col=col, range=[-1, 1])
fig.update_layout(
height=650,
width=1150,
title_text="Effect of MA order: sample paths and ACF cutoff",
)
fig.show()
theta = orders["MA(5)"]
impulse = np.r_[1.0, theta]
lags = np.arange(impulse.size)
fig = go.Figure(go.Bar(x=lags, y=impulse))
fig.update_layout(
title="MA impulse response (finite-memory filter coefficients)",
xaxis_title="lag h",
yaxis_title="response weight (θ_h, with θ_0=1)",
)
fig.show()
theta_true = np.array([0.8, 0.3])
x, _eps_true = simulate_ma(theta_true, n=1000, sigma=1.0, rng=rng, burnin=200)
mu_1, theta_1, sigma_1 = fit_ma1_via_acf(x)
mu_2, theta_2, sigma_2 = fit_ma_via_acf_random_search(
x, q=2, max_lag=20, n_candidates=25000, seed=123
)
resid_1 = innovations_from_ma(x, theta_1, mu=mu_1)
resid_2 = innovations_from_ma(x, theta_2, mu=mu_2)
max_lag = 20
acf_resid_1 = sample_acf(resid_1, max_lag)
acf_resid_2 = sample_acf(resid_2, max_lag)
conf = 1.96 / np.sqrt(x.size)
lags = np.arange(1, max_lag + 1)
fig = make_subplots(
rows=1,
cols=2,
column_titles=["Residual ACF after fitting MA(1)", "Residual ACF after fitting MA(2)"],
)
for col, acf_resid in enumerate([acf_resid_1, acf_resid_2], start=1):
fig.add_trace(go.Bar(x=lags, y=acf_resid[1:], showlegend=False), row=1, col=col)
fig.add_trace(
go.Scatter(
x=[lags[0], lags[-1]],
y=[conf, conf],
mode="lines",
line=dict(color="crimson", dash="dash"),
showlegend=False,
),
row=1,
col=col,
)
fig.add_trace(
go.Scatter(
x=[lags[0], lags[-1]],
y=[-conf, -conf],
mode="lines",
line=dict(color="crimson", dash="dash"),
showlegend=False,
),
row=1,
col=col,
)
fig.update_xaxes(title_text="lag", row=1, col=col)
fig.update_yaxes(title_text="ACF", row=1, col=col, range=[-1, 1])
fig.update_layout(
height=350,
width=950,
title_text="Residual autocorrelation diagnostic (white-noise residuals ≈ 0)",
)
fig.show()
(mu_1, theta_1, sigma_1), (mu_2, theta_2, sigma_2)
((-0.05216475228517619, array([0.9387]), 1.0124577283931864),
(-0.05216475228517619, array([0.8422, 0.4018]), 1.0152504473095068))
Pitfalls and diagnostics#
Don’t confuse MA(\(q\)) with the rolling mean used for smoothing.
MA(\(q\)) is always stationary, but invertibility matters for identification and stable residual computation.
ACF cutoff is a heuristic; always check residual autocorrelation (and, in practice, tests like Ljung–Box).
MA parameters can be hard to estimate in small samples; likelihood surfaces can be flat and multi-modal.